import numpy as np
import matplotlib.pyplot as plt
import wave
from scipy.linalg import toeplitz
from scipy.io.wavfile import write
from scipy.signal import wiener
import IPython.display as ipd
from statsmodels.tsa.stattools import adfuller
from scipy.signal import welch
def read_audio(path):
signal = wave.open(path, 'rb')
signal_data = signal.readframes(-1)
signal_data = np.frombuffer(signal_data, dtype='int16')
signal_data = signal_data / np.max(np.abs(signal_data))
return signal_data, signal.getframerate()
def prep(signal, P):
N = len(signal)
corr = np.correlate(signal, signal, mode='same') / (N-P+1)
corr_vec = corr[P:2*P]
R = toeplitz(corr_vec)
return R, corr_vec
def train(Rx, corr_vec):
wopt = np.linalg.solve(Rx, corr_vec)
return wopt
def process(signal, P, wopt):
N = len(signal)
dhat = np.zeros(N)
Xbuffer = np.zeros(P)
for n in range(N):
Xbuffer = np.roll(Xbuffer, 1)
Xbuffer[0] = signal[n]
dhat[n] = np.dot(wopt, Xbuffer)
return dhat
def wiener_filter(signal, data, length=3):
Rd, data_corr_vec = prep(data, length)
Rx, signal_corr_vec = prep(signal, length)
wopt = train(Rx, data_corr_vec)
return process(signal, length, wopt)
def export_audio(filename, signal, fs):
audio = signal.astype('int16')
write('./audio/'+filename+'.wav', fs, audio)
print('Audio exported successfully!')
signal1, fs1 = read_audio('./audio/conv1.wav')
signal2, fs2 = read_audio('./audio/conv73.wav')
N = min(len(signal1), len(signal2))
signal1 = signal1[:N]
signal2 = signal2[:N]
display(ipd.Audio(signal1, rate=fs1))
display(ipd.Audio(signal2, rate=fs2))
signal = signal1 + signal2
display(ipd.Audio(signal, rate=fs1))
In statistics, unit root test tests whether a time series variable is non-stationary and possesses a unit root. The null hypothesis is generally defined as the presence of a unit root and the alternative hypothesis is either stationarity, trend stationarity or explosive root depending on the test used.
def generate_non_stationary_signal(length=100, trend_strength=0.2):
time = np.arange(length)
stationary_component = np.random.normal(0, 1, size=length)
trend = trend_strength * time
non_stationary_signal = stationary_component + trend
return non_stationary_signal
def check_stationarity(signal):
result = adfuller(signal)
print('ADF Statistic: %f' % result[0])
print('p-value: %f' % result[1])
if result[1] > 0.05:
print('Signal is not stationary')
else:
print('Signal is stationary')
non_stationary_signal = generate_non_stationary_signal()
check_stationarity(non_stationary_signal)
ADF Statistic: -0.132515 p-value: 0.946102 Signal is not stationary
check_stationarity(signal)
ADF Statistic: -56.620644 p-value: 0.000000 Signal is stationary
plt.figure(figsize=(15, 5))
plt.subplot(1, 2, 1)
plt.plot(non_stationary_signal)
plt.title('Non-stationary signal')
plt.subplot(1, 2, 2)
plt.plot(signal)
plt.title('Stationary signal')
plt.show()
def check_gaussianity(signal):
plt.hist(signal, bins='auto')
plt.show()
check_gaussianity(signal1)
check_gaussianity(signal2)
In signal processing, white noise is a random signal having equal intensity at different frequencies, giving it a constant power spectral density.
def plot_psd(signal, fs, title):
f, Pxx = welch(signal, fs=fs)
plt.semilogy(f, Pxx)
plt.title(title)
plt.xlabel('Frequency (Hz)')
plt.ylabel('Power/Frequency (dB/Hz)')
plt.show()
plot_psd(signal1, fs1, 'PSD of signal 1')
plot_psd(signal2, fs2, 'PSD of signal 2')
INTERPRETATION :
The Power Spectral Density (PSD) analysis revealed valuable insights into the frequency characteristics of the signal. Contrary to the expectations of white noise, the PSD exhibited spectral peaks indicating dominant frequencies.
cross_correlation = np.correlate(signal1, signal2, mode='full')[N-1:]
cross_correlation = cross_correlation / np.max(np.abs(cross_correlation))
plt.plot(cross_correlation, label='Cross-correlation')
plt.axhline(0, color='black', linestyle='--', label='Zero')
plt.legend()
plt.show()
max_cross_correlation = np.max(np.abs(cross_correlation))
print('Maximum cross-correlation: ', max_cross_correlation)
Maximum cross-correlation: 1.0
The Wiener filter is a powerful tool in signal processing with various applications due to its ability to enhance the quality of signals in the presence of noise.
SNR_apriori = 10 * np.log10(np.var(signal1) / np.var(signal2))
SNR_apriori
0.6246423779698772
ADDING WHITE NOISE TO SIGNAL 1
white_noise = 0.05*np.random.normal(size=N)
plot_psd(white_noise, fs1, 'PSD of white noise')
plt.hist(white_noise, bins='auto')
(array([1.000e+00, 0.000e+00, 0.000e+00, 0.000e+00, 0.000e+00, 0.000e+00,
1.000e+00, 1.000e+00, 2.000e+00, 0.000e+00, 1.000e+00, 4.000e+00,
2.000e+00, 1.000e+00, 6.000e+00, 2.000e+00, 2.000e+00, 1.100e+01,
1.500e+01, 1.400e+01, 2.700e+01, 2.900e+01, 3.600e+01, 4.000e+01,
5.300e+01, 5.700e+01, 7.800e+01, 8.500e+01, 8.800e+01, 1.260e+02,
1.450e+02, 1.890e+02, 2.020e+02, 2.420e+02, 2.810e+02, 3.190e+02,
3.290e+02, 4.060e+02, 4.500e+02, 4.900e+02, 5.210e+02, 5.880e+02,
6.580e+02, 7.320e+02, 7.390e+02, 8.090e+02, 8.590e+02, 9.700e+02,
9.990e+02, 1.043e+03, 1.042e+03, 1.119e+03, 1.129e+03, 1.161e+03,
1.201e+03, 1.226e+03, 1.152e+03, 1.120e+03, 1.132e+03, 1.101e+03,
1.081e+03, 1.057e+03, 9.970e+02, 9.360e+02, 9.190e+02, 8.180e+02,
8.230e+02, 7.690e+02, 6.720e+02, 5.650e+02, 5.900e+02, 5.710e+02,
4.690e+02, 4.140e+02, 3.880e+02, 3.220e+02, 2.800e+02, 2.290e+02,
2.290e+02, 1.900e+02, 1.470e+02, 1.510e+02, 1.390e+02, 9.600e+01,
8.900e+01, 6.800e+01, 5.500e+01, 4.700e+01, 3.300e+01, 3.400e+01,
2.800e+01, 1.900e+01, 1.300e+01, 9.000e+00, 1.500e+01, 4.000e+00,
6.000e+00, 5.000e+00, 6.000e+00, 3.000e+00, 2.000e+00, 5.000e+00,
0.000e+00, 2.000e+00]),
array([-0.22371948, -0.21971337, -0.21570727, -0.21170117, -0.20769507,
-0.20368897, -0.19968287, -0.19567677, -0.19167066, -0.18766456,
-0.18365846, -0.17965236, -0.17564626, -0.17164016, -0.16763406,
-0.16362796, -0.15962185, -0.15561575, -0.15160965, -0.14760355,
-0.14359745, -0.13959135, -0.13558525, -0.13157915, -0.12757304,
-0.12356694, -0.11956084, -0.11555474, -0.11154864, -0.10754254,
-0.10353644, -0.09953033, -0.09552423, -0.09151813, -0.08751203,
-0.08350593, -0.07949983, -0.07549373, -0.07148763, -0.06748152,
-0.06347542, -0.05946932, -0.05546322, -0.05145712, -0.04745102,
-0.04344492, -0.03943882, -0.03543271, -0.03142661, -0.02742051,
-0.02341441, -0.01940831, -0.01540221, -0.01139611, -0.00739 ,
-0.0033839 , 0.0006222 , 0.0046283 , 0.0086344 , 0.0126405 ,
0.0166466 , 0.0206527 , 0.02465881, 0.02866491, 0.03267101,
0.03667711, 0.04068321, 0.04468931, 0.04869541, 0.05270151,
0.05670762, 0.06071372, 0.06471982, 0.06872592, 0.07273202,
0.07673812, 0.08074422, 0.08475033, 0.08875643, 0.09276253,
0.09676863, 0.10077473, 0.10478083, 0.10878693, 0.11279303,
0.11679914, 0.12080524, 0.12481134, 0.12881744, 0.13282354,
0.13682964, 0.14083574, 0.14484184, 0.14884795, 0.15285405,
0.15686015, 0.16086625, 0.16487235, 0.16887845, 0.17288455,
0.17689066, 0.18089676, 0.18490286, 0.18890896, 0.19291506]),
<BarContainer object of 104 artists>)
sig1_noise = signal1 + white_noise
display(ipd.Audio(signal1, rate=fs1))
display(ipd.Audio(sig1_noise, rate=fs1))
plt.figure(figsize=(15, 5))
plt.plot(sig1_noise, label='Signal 1 + White noise')
plt.plot(signal1, label='Signal 1')
plt.legend()
plt.show()
SNR_apriori = 10 * np.log10(np.mean(sig1_noise**2) / np.mean(white_noise**2))
SNR_apriori
8.413340996526609
filtered_signal = wiener(sig1_noise, mysize=5, noise=np.abs(white_noise)**2)
plt.figure(figsize=(15, 5))
plt.plot(sig1_noise, label='Noisy Signal')
plt.plot(filtered_signal, label='Filtered signal')
plt.plot(signal1, label='Signal 1')
plt.legend()
plt.show()
SNR_apos = 10 * np.log10(np.mean(filtered_signal**2) / np.mean((signal1 - filtered_signal)**2))
print('SNR a posteriori: ', SNR_apos)
SNR a posteriori: 13.470090793600583
Gain_SNR = SNR_apos - SNR_apriori
print('Gain in SNR: ', Gain_SNR)
Gain in SNR: 5.056749797073975
EQMmin = np.mean((signal1 - filtered_signal)**2)
print('Minimum EQM: ', EQMmin)
Minimum EQM: 0.0006559455279819954
display(ipd.Audio(sig1_noise, rate=fs1))
display(ipd.Audio(filtered_signal, rate=fs1))
1. FILTER 2
filtered_signal = wiener(filtered_signal, mysize=5, noise=np.abs(filtered_signal - signal1)**2)
plt.figure(figsize=(15, 5))
plt.plot(sig1_noise, label='Noisy Signal')
plt.plot(filtered_signal, label='Filtered signal')
plt.plot(signal1, label='Signal 1')
plt.legend()
plt.show()
SNR_apos1 = 10 * np.log10(np.mean(filtered_signal**2) / np.mean((signal1 - filtered_signal)**2))
Gain_SNR = SNR_apos1 - SNR_apriori
Gain_SNR
5.748083028944327
EQMmin1 = np.mean((signal1 - filtered_signal)**2)
print('Minimum EQM: ', EQMmin1)
Minimum EQM: 0.0005404017348891428
display(ipd.Audio(filtered_signal, rate=fs1))
2. FILTER 3
filtered_signal = wiener(filtered_signal, mysize=5, noise=np.abs(filtered_signal - signal1)**2)
plt.figure(figsize=(15, 5))
plt.plot(sig1_noise, label='Noisy Signal')
plt.plot(filtered_signal, label='Filtered signal')
plt.plot(signal1, label='Signal 1')
plt.legend()
plt.show()
INTERPRETATION :
The filtered signal is getting closer to the actual signal after each iteration
SNR_apos2 = 10 * np.log10(np.mean(filtered_signal**2) / np.mean((signal1 - filtered_signal)**2))
Gain_SNR = SNR_apos2 - SNR_apriori
Gain_SNR
5.551435189052807
EQMmin = np.mean((signal1 - filtered_signal)**2)
print('Minimum EQM: ', EQMmin)
Minimum EQM: 0.0005536863092488224
display(ipd.Audio(sig1_noise, rate=fs1))
display(ipd.Audio(filtered_signal, rate=fs1))
filtered_signal = wiener(sig1_noise, mysize=5, noise=np.abs(white_noise)**2)
Gain = []
EQM = []
SNR_apos = 10 * np.log10(np.mean(filtered_signal**2) / np.mean((signal1 - filtered_signal)**2))
Gain_SNR = SNR_apos - SNR_apriori
Gain.append(Gain_SNR)
EQM.append(np.mean((signal1 - filtered_signal)**2))
for i in range(10):
filtered_signal = wiener(filtered_signal, mysize=5, noise=np.abs(white_noise)**2)
SNR_apos = 10 * np.log10(np.mean(filtered_signal**2) / np.mean((signal1 - filtered_signal)**2))
Gain_SNR = SNR_apos - SNR_apriori
Gain.append(Gain_SNR)
EQM.append(np.mean((signal1 - filtered_signal)**2))
SIGNAL TO NOISE RATIO (SNR)
plt.figure(figsize=(15, 5))
plt.plot(Gain)
plt.axhline(Gain[0], color='black', linestyle='--', label=f'Starting Gain {Gain[0]} dB')
plt.axhline(np.max(Gain), color='red', linestyle='--', label=f'Maximum Gain {np.max(Gain)} dB')
plt.axvline(np.argmax(Gain), color='green', linestyle='--', label=f'Optimal number of filters : {np.argmax(Gain) + 1}')
plt.legend()
plt.title('Gain in SNR')
plt.xlabel('Number of filters')
plt.ylabel('Gain (dB)')
plt.show()
print("Maximum Gain: ", np.max(Gain))
print("After applying a cascade of "+ str(np.argmax(Gain) + 1) + " Wiener Filters, the maximum gain is achieved")
Maximum Gain: 5.5392522140278935 After applying a cascade of 2 Wiener Filters, the maximum gain is achieved
MINIMUN MEAN SQUARE ERROR (MMSE)
plt.figure(figsize=(15, 5))
plt.plot(EQM)
plt.axhline(EQM[0], color='black', linestyle='--', label=f'Starting EQM {EQM[0]}')
plt.axhline(np.min(EQM), color='red', linestyle='--', label=f'Minimum EQM {np.min(EQM)}')
plt.axvline(np.argmin(EQM), color='red', linestyle='--', label=f'Optimal number of filters : {np.argmin(EQM) + 1}')
plt.legend()
plt.title('Minimum EQM')
plt.xlabel('Number of filters')
plt.ylabel('EQM')
plt.show()
print("Minimum EQM: ", np.min(EQM))
print("After applying a cascade of "+ str(np.argmin(EQM) + 1) + " Wiener Filters, the minimum EQM is achieved")
Minimum EQM: 0.0005512482161780647 After applying a cascade of 2 Wiener Filters, the minimum EQM is achieved
print(((EQM[0] - np.min(EQM))/EQM[0] )*100)
15.96128143841915
INTERPRETATION:
- After applying 3 wiener filters, The filtered signal starts to lose information 🛑
- The cascade of 3 Wiener Filters successfully made us gain around 15% in terms of distance ✅
display(ipd.Audio(sig1_noise, rate=fs1))
display(ipd.Audio(filtered_signal, rate=fs1))
SNR_apriori = 10 * np.log10(np.mean(signal1**2) / np.mean(white_noise**2))
SNR_apriori
7.758944865079046
Gains = []
signals = []
EQMs = []
for order in range(2,10):
filtered_signal = wiener(sig1_noise, mysize=order)
SNR_apos = 10 * np.log10(np.mean(filtered_signal**2) / np.mean((signal1 - filtered_signal)**2))
Gains.append(SNR_apos - SNR_apriori)
EQMs.append(np.mean((signal1 - filtered_signal)**2))
signals.append(filtered_signal)
plt.figure(figsize=(15, 5))
plt.plot(range(2,10), Gains)
plt.title('SNR a posteriori')
plt.xlabel('Order of the filter')
plt.ylabel('SNR (dB)')
plt.axvline(np.argmax(Gains) + 2, color='red', linestyle='--', label=f'Order {np.argmax(Gains) + 2}')
plt.axhline(np.max(Gains), color='black', linestyle='--', label=f'Maximum SNR {np.max(Gains)} dB')
plt.legend()
plt.show()
plt.figure(figsize=(15, 5))
plt.plot(range(2,10), EQMs)
plt.title('EQM')
plt.xlabel('Order of the filter')
plt.ylabel('EQM')
plt.axvline(np.argmin(EQMs) + 2, color='red', linestyle='--', label=f'Order {np.argmin(EQMs) + 2}')
plt.axhline(np.min(EQMs), color='black', linestyle='--', label=f'Minimum EQM {np.min(EQMs)}')
plt.legend()
plt.show()
=> This high sensitivity can result in the extraction of noise components along with the signal of interest, leading to a less accurate representation.
=> LOW VALUES OF SNR
=> Wide window filters result in increased smoothing of the signal : leads to the loss of important signal details, especially in signals with rapid variations
plt.figure(figsize=(15, 5))
plt.plot(sig1_noise, label='Signal In')
plt.plot(signals[0], label='Filtered signal')
plt.legend()
plt.title("LOW ORDER FILTER EFFECT")
plt.show()
plt.figure(figsize=(15, 5))
plt.plot(signal1, label='Data')
plt.plot(signals[-1], label='Filtered signal')
plt.legend()
plt.title("HIGH ORDER FILTER EFFECT")
plt.show()
plt.figure(figsize=(15, 5))
plt.plot(signal1, label='Data')
plt.plot(signals[np.argmax(Gains)], label='Filtered signal')
plt.legend()
plt.title("OPTIMAL ORDER FILTER")
plt.show()
Segmenting the signal allows the Wiener filter to adapt to local characteristics and variations, enhancing its ability to capture and suppress noise in different parts of the signal.
segment_size = 2048
n_segments = len(sig1_noise) // segment_size
limits = np.arange(0, n_segments) * segment_size
plt.figure(figsize=(15, 5))
plt.plot(sig1_noise, label='Signal In')
for limit in limits:
plt.axvline(limit, color='red', linestyle='--')
plt.title('Signal segmentation')
plt.legend()
plt.show()
def fixed_size_window_segmentation(signal, segment_size):
num_segments = len(signal) // segment_size
filtered_segments = np.zeros_like(signal)
for i in range(num_segments):
start_idx = i * segment_size
end_idx = (i + 1) * segment_size
segment = signal[start_idx:end_idx]
filtered_segment = wiener(segment)
filtered_segments[start_idx:end_idx] = filtered_segment
return filtered_segments
SNR_apriori = 10 * np.log10(np.mean(signal1**2) / np.mean(white_noise**2))
SNR_apriori
7.758944865079046
segment_size = 512
filtered_signal = fixed_size_window_segmentation(sig1_noise, segment_size)
SNR_apos = 10 * np.log10(np.mean(filtered_signal**2) / np.mean((signal1 - filtered_signal)**2))
SNR_apos
10.883071453651027
display(ipd.Audio(sig1_noise, rate=fs1))
display(ipd.Audio(filtered_signal, rate=fs1))
sizes = [2**n for n in range(5, 16)]
SEG_SNR = []
SEG_EQM = []
for size in sizes:
filtered_signal = fixed_size_window_segmentation(sig1_noise, size)
SEG_SNR.append(10 * np.log10(np.mean(filtered_signal**2) / np.mean((signal1 - filtered_signal)**2)))
SEG_EQM.append(np.mean((signal1 - filtered_signal)**2))
plt.figure(figsize=(15, 5))
plt.plot(sizes, SEG_SNR)
plt.axhline(np.max(SEG_SNR), color='red', linestyle='--', label=f'Maximum SNR {np.max(SEG_SNR)} dB')
plt.title('SNR a posteriori')
plt.xlabel('Segment size')
plt.ylabel('SNR (dB)')
plt.legend()
plt.show()
plt.figure(figsize=(15, 5))
plt.plot(sizes, SEG_EQM)
plt.axhline(np.min(SEG_EQM), color='red', linestyle='--', label=f'Minimum EQM {np.min(SEG_EQM)}')
plt.title('EQM')
plt.xlabel('Segment size')
plt.ylabel('EQM')
plt.legend()
plt.show()
INTERPRETATION:
The signal's stationnarity is verified since the 1st chapter of this document => The same statistical properties are maintained for whole signal.
=> Overlapping helps reduce edge effects, and spectral leakage.
- Edge effects, also known as windowing artifacts, are distortions that can occur at the boundaries of signal segments when applying window functions.
These effects are particularly relevant when working with finite-duration segments of a continuous signal.
- Spectral Leakage occurs when the frequency content of a signal extends beyond the boundaries of a window.
SNR_apriori = 10 * np.log10(np.mean(sig1_noise**2) / np.mean(white_noise**2))
SNR_apriori
8.413340996526609
def segmentation_with_overlap(signal, window_size=512, overlap_ratio=0.5):
overlap_size = int(window_size * overlap_ratio)
filtered_segments = []
start_indexes = []
for i in range(0, len(signal) - window_size + 1, window_size - overlap_size):
start_idx = i
end_idx = i + window_size
segment = signal[start_idx:end_idx]
filtered_segment = wiener(segment)
filtered_segments.append(filtered_segment)
start_indexes.append(start_idx)
filtered_signal = np.concatenate(filtered_segments[:1] + [filtered_segments[i][overlap_size:] for i in range(1, len(filtered_segments))])
return filtered_signal, start_indexes
overlap_filtered_signal, start_indexes = segmentation_with_overlap(sig1_noise, window_size=512, overlap_ratio=0.5)
plt.figure(figsize=(15, 5))
plt.plot(sig1_noise, label='Signal In')
for start_idx in start_indexes:
plt.axvline(start_idx, color='red', linestyle='--')
plt.title('Signal segmentation')
plt.legend()
plt.show()
display(ipd.Audio(sig1_noise, rate=fs1))
display(ipd.Audio(overlap_filtered_signal, rate=fs1))
SNR_apos = 10 * np.log10(np.mean(overlap_filtered_signal**2) /
np.mean((signal1[:len(overlap_filtered_signal)] - overlap_filtered_signal)**2))
SNR_apos
10.88413275085027
overlap_filtered_signal, start_indexes = segmentation_with_overlap(overlap_filtered_signal, window_size=512, overlap_ratio=0.5)
SNR_apos = 10 * np.log10(np.mean(overlap_filtered_signal**2) / np.mean((signal1[:len(overlap_filtered_signal)] - overlap_filtered_signal)**2))
SNR_apos
11.130301949371006
display(ipd.Audio(signal, rate=fs1))
SNR1_apriori = 10*np.log10(np.mean(signal1**2)/np.mean(signal2**2))
SNR2_apriori = 10*np.log10(np.mean(signal2**2)/np.mean(signal1**2))
print('SNR1 apriori: ', SNR1_apriori)
print('SNR2 apriori: ', SNR2_apriori)
SNR1 apriori: 0.6246423600025764 SNR2 apriori: -0.6246423600025762
==> SNR IS SO LOW !! BAD SEPARATION
d1hat = wiener_filter(signal, signal1, length=3)
d2hat = wiener_filter(signal, signal2, length=3)
display(ipd.Audio(d1hat, rate=fs1))
display(ipd.Audio(d2hat, rate=fs1))
def fixed_size_window_segmentation(signal, data, segment_size):
num_segments = len(signal) // segment_size
filtered_segments = np.zeros_like(signal)
for i in range(num_segments):
start_idx = i * segment_size
end_idx = (i + 1) * segment_size
segment = signal[start_idx:end_idx]
filtered_segment = wiener_filter(segment, data[start_idx:end_idx])
filtered_segments[start_idx:end_idx] = filtered_segment
return filtered_segments
def segmentation_with_overlap(signal, data, filter_size, window_size=512, overlap_ratio=0.5):
overlap_size = int(window_size * overlap_ratio)
filtered_segments = []
for i in range(0, len(signal) - window_size + 1, window_size - overlap_size):
start_idx = i
end_idx = i + window_size
segment = signal[start_idx:end_idx]
filtered_segment = wiener_filter(
segment, data[start_idx:end_idx], length=filter_size)
filtered_segments.append(filtered_segment)
filtered_signal = np.zeros_like(signal)
for i in range(len(filtered_segments)):
start_idx = i * (window_size - overlap_size)
end_idx = start_idx + window_size
filtered_signal[start_idx:end_idx] = filtered_segments[i]
return filtered_signal
d1hat_prime = fixed_size_window_segmentation(signal, signal1, segment_size=512)
d2hat_prime = fixed_size_window_segmentation(signal, signal2, segment_size=512)
display(ipd.Audio(d1hat_prime, rate=fs1))
display(ipd.Audio(d2hat_prime, rate=fs1))
d1hat_second = segmentation_with_overlap(
signal, signal1, 4, window_size=512, overlap_ratio=0.5
)
d2hat_second = segmentation_with_overlap(
signal, signal2, 3, window_size=512, overlap_ratio=0.1
)
display(ipd.Audio(d1hat_second, rate=fs1))
display(ipd.Audio(d2hat_second, rate=fs1))
Inversing Rx is computationally heavy : O(P^3) !!!
Introducing a new method to reduce the complexity to O(P*N) :
In addition to the traditional approach of calculating the Wiener filter coefficients through matrix inversion, an alternative methodology involves directly adjusting the weighting of residuals. This method offers a nuanced perspective on the implementation of the Wiener filter, providing an alternative route to achieve optimal denoising.
PROCEDURE
Compute the local mean L(n) and local variance V(n) using appropriate filtering techniques. These statistics serve as key parameters for subsequent computations.
Calculate the residuals res(n) by subtracting the local mean from the observed signal.
Introduce a weighting factor that dynamically adjusts the influence of the residuals based on the ratio of the estimated noise power to the local variance.
- When V(n) (local variance) is significantly larger than the estimated noise, 1− (noise/V(n)) approaches 1, and the residual term is given more weight. This is desirable in regions where the signal dominates over noise.
- When V(n) is comparable to or smaller than the estimated noise,1− (noise/V(n)) approaches 0, and the residual term is given less weight. This is advantageous in regions where the noise level is relatively high compared to the local variance.
The Wiener filter combines the local mean and the weighted residual to obtain the final output. The weighted residual acts as a correction to the local mean, and the weight is determined by the ratio of the estimated noise to the local variance.
ALGORITHM
lMean = local mean on each segment with the length filter_size
lVar = local variance on each segment with the length filter_size
noise = the estimate power of noise
if noise is None:
res = (data - lMean)
res *= (1 - noise / lVar)
res += lMean
out =
CONSTRAINT !!! SENSITIVITY TO NOISE ESTIMATION
The method involves adjusting residuals based on the ratio of the estimated noise power to the local variance. If the noise estimation is unreliable or if the noise power is consistently overestimated, the performance of the method may be compromised.
INDEPENDENT COMPONENT ANALYSIS (ICA) is a computational method for separating a multivariate signal into additive subcomponents. This is done by assuming that at most one subcomponent is Gaussian and that the subcomponents are statistically independent from each other.ICA is a special case of blind source separation. A common example application is the "COCKTAIL PARTY PROBLEM" of listening in on one person's speech in a noisy room.
from sklearn.decomposition import FastICA
s1 = signal1
s2 = signal2
S = np.c_[s1, s2]
S /= S.std(axis=0)
A = np.array([[0.5, 0.5],
[0.7, 0.3]])
X = np.dot(S, A.T)
plt.figure(figsize=(15, 5))
plt.plot(X.T[0])
[<matplotlib.lines.Line2D at 0x200e814b670>]
display(ipd.Audio(X.T[0], rate=fs1))
plt.figure(figsize=(15, 5))
models = [X, S]
names = ['Observations (mixed signal)',
'True Sources']
colors = ['red', 'steelblue']
for ii, (model, name) in enumerate(zip(models, names), 1):
plt.subplot(2, 1, ii)
plt.title(name)
for sig, color in zip(model.T, colors):
plt.plot(sig, color=color)
plt.tight_layout()
plt.show()
ica = FastICA(n_components=2, whiten="arbitrary-variance")
S_ = ica.fit_transform(X)
A_ = ica.mixing_
assert np.allclose(X, np.dot(S_, A_.T) + ica.mean_)
plt.figure(figsize=(15, 5))
models = [X, S, S_]
names = ['Observations (mixed signal)',
'True Sources',
'ICA recovered signals']
colors = ['red', 'steelblue']
for ii, (model, name) in enumerate(zip(models, names), 1):
plt.subplot(3, 1, ii)
plt.title(name)
for sig, color in zip(model.T, colors):
plt.plot(sig, color=color)
plt.tight_layout()
plt.show()
display(ipd.Audio(S_[:, 0], rate=fs1))
display(ipd.Audio(S_[:, 1], rate=fs1))